/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package studio.raptor.ddal.common.algorithm;

import com.google.common.base.Splitter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;

import studio.raptor.ddal.common.exception.GenericException;
import studio.raptor.ddal.common.exception.code.RouteErrCodes;

/**
 * 两阶段路由抽象类。
 *
 * 如若使用两阶段路由算法，需使用者实现
 *
 * @author Sam
 * @since 3.0.0
 */
public abstract class HashRangeSingleKeyShardAlgorithm<T extends Comparable<?>> implements SingleKeyShardAlgorithm<T> {

    private static Logger logger = LoggerFactory.getLogger(HashRangeSingleKeyShardAlgorithm.class);
    private final Object lockObject = new Object();
    private Integer mod;
    private List<Range> ranges;

    /**
     * 根据分片值计算hash
     *
     * @param shardValue 分片值
     * @return hash
     */
    public abstract int hashShardValue(T shardValue);

    /**
     * 计算范围分片
     *
     * @param shardValue 分片值
     * @return hash
     */
    public abstract int rangeShardValue(T shardValue);

    protected int getModValue() {
        if (mod == null) {
            synchronized (lockObject) {
                if (mod == null) {
                    try {
                        Properties hashRangeProperties = new Properties();
                        hashRangeProperties.load(HashRangeSingleKeyShardAlgorithm.class.getClassLoader().getResourceAsStream("ddal/hash-range.properties"));
                        String modPlainText = hashRangeProperties.getProperty("mod");
                        if (null != modPlainText && modPlainText.matches("\\d+")) {
                            this.mod = Integer.parseInt(modPlainText);
                        } else {
                            throw new GenericException(RouteErrCodes.ROUTE_427);
                        }
                    } catch (IOException ioe) {
                        logger.error("Read hash range prop failed.", ioe);
                        throw new GenericException(RouteErrCodes.ROUTE_428);
                    }
                }
            }
        }
        return mod;
    }

    protected List<Range> getRanges() {
        if (null == ranges) {
            synchronized (lockObject) {
                if (null == ranges) {
                    try {
                        ranges = new ArrayList<>();
                        Properties hashRangeProperties = new Properties();
                        hashRangeProperties.load(HashRangeSingleKeyShardAlgorithm.class.getClassLoader().getResourceAsStream("ddal/hash-range.properties"));
                        String rangeText = hashRangeProperties.getProperty("range");
                        if (null != rangeText && rangeText.length() > 0 && rangeText.indexOf('-') > 0) {
                            Iterable<String> rangeMetaArray = Splitter.on(",").trimResults().split(rangeText);
                            for (String rangeMeta : rangeMetaArray) {
                                List<String> rangeItem = Splitter.on("-").trimResults().splitToList(rangeMeta);
                                if (rangeItem.size() == 2) {
                                    ranges.add(new Range(Integer.parseInt(rangeItem.get(0)), Integer.parseInt(rangeItem.get(1))));
                                } else {
                                    throw new GenericException(RouteErrCodes.ROUTE_429, new Object[]{rangeMeta});
                                }
                            }
                        } else {
                            throw new GenericException(RouteErrCodes.ROUTE_429, new Object[]{""});
                        }
                    } catch (IOException ioe) {
                        logger.error("Read hash range prop failed.", ioe);
                        throw new GenericException(RouteErrCodes.ROUTE_428);
                    }
                }
            }
        }
        return ranges;
    }

    @Override
    public String doEqual(Collection<String> shards, ShardValue<T> shardValue) {
        T value = shardValue.getValue();
        String suffix = hashShardValue(value) + "" + rangeShardValue(value);
        for (String shard : shards) {
            if (shard.endsWith(suffix)) {
                return shard;
            }
        }
        throw new UnsupportedOperationException();
    }

    @Override
    public Collection<String> doIn(Collection<String> shardings, ShardValue<T> shardValue) {
        Collection<String> r = new HashSet<>();
        for (T value : shardValue.getValues()) {
            String suffix = hashShardValue(value) + "" + rangeShardValue(value);
            for (String shard : shardings) {
                if (shard.endsWith(suffix)) {
                    r.add(shard);
                }
            }
        }
        return r;
    }

    @Override
    public Collection<String> doBetween(Collection<String> shardings, ShardValue<T> shardValue) {
        throw new UnsupportedOperationException();
    }

    public static class Range {
        private int start;
        private int end;

        Range(int start, int end) {
            this.start = start;
            this.end = end;
        }

        public boolean inRange(int value) {
            return value >= start && value <= end;
        }
    }
}
